package cn.doitedu.rtmk.mgmt.groovy

import cn.doitedu.common.interfaces.RuleCalculator
import cn.doitedu.common.pojo.EventBean
import cn.doitedu.common.utils.UserEventComparator
import com.alibaba.fastjson.JSONObject
import groovy.util.logging.Slf4j
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.MapState
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.util.Collector
import org.roaringbitmap.RoaringBitmap
import redis.clients.jedis.Jedis

class Model001Calculator implements RuleCalculator {

    MapState<String, Integer> state;
    Jedis jedis;
    JSONObject paramJsonObject;
    String ruleInstanceId;
    JSONObject resObj;
    RoaringBitmap roaringBitmap;
    boolean  isInitialed = false;

    /**
     * 初始化方法
     * @param runtimeContext
     */
    @Override
    void open(RuntimeContext runtimeContext, JSONObject paramJsonObject, RoaringBitmap roaringBitmap) {
        state = runtimeContext.getMapState(new MapStateDescriptor<String, Integer>("dynamic_tag_state", String.class, Integer.class))
        jedis = new Jedis("doitedu", 6379)
        this.paramJsonObject = paramJsonObject;
        this.roaringBitmap = roaringBitmap

        ruleInstanceId = paramJsonObject.getString("rule_instance_id")
        resObj = new JSONObject()
        isInitialed = true;
    }

    @Override
    public boolean isInitialed(){
        return this.isInitialed
    }


    @Override
    void processEvent(EventBean eventBean, Collector<String> out) {
        //  只有属于静态人群圈选的，才进行处理
        if (roaringBitmap.contains(eventBean.getUserId())) {
            def fireEvent = paramJsonObject.getJSONObject("fire_condition")
            // 取出规则参数中的动态画像条件数组
            def dynamicProfileConditions = paramJsonObject.getJSONArray("dynamic_profile_condition")

            // 判断eventBean事件是否是本规则实例中定义的触发事件
            if (UserEventComparator.userEventIsEqualParam(eventBean, fireEvent)) {
                // 检查各个动态画像标签条件是否已经都满足
                boolean res = true;
                for (int i = 0; i < dynamicProfileConditions.size(); i++) {
                    JSONObject dynamicProfileCondition = dynamicProfileConditions.getJSONObject(i)
                    int conditionId = dynamicProfileCondition.getInteger("condition_id")
                    int conditionValue = dynamicProfileCondition.getInteger("cnt")
                    // 取出当前用户当前动态标签条件的真实统计值
                    int realValue = state.get(ruleInstanceId + ":" + conditionId)
                    // 比较真实值和条件阈值
                    if (realValue < conditionValue) {
                        res = false
                        break
                    }
                }
                if (res) {
                    resObj.put("user_id", eventBean.getUserId())
                    resObj.put("rule_instance_id", ruleInstanceId)
                    resObj.put("trigger_time", eventBean.getEventTime())
                    out.collect(resObj.toJSONString())
                }
            } else {
                // 遍历动态画像条件
                for (int i = 0; i < dynamicProfileConditions.size(); i++) {
                    // 判断当前的用户行为事件，是否匹配了动态画像条件中的事件
                    def dynamicProfileCondition = dynamicProfileConditions.getJSONObject(i)
                    if (UserEventComparator.userEventIsEqualParam(eventBean, dynamicProfileCondition)) {
                        // 判断状态中是否有该条件的值
                        def oldValue = state.get(ruleInstanceId + ":" + dynamicProfileCondition.getInteger("condition_id"))

                        // 如果值为空，则去redis中查询初始值
                        if (oldValue == null) {
                            String redisValueStr = jedis.get(paramJsonObject.getString("rule_instance_id") + ":" + eventBean.getUsrId() + ":" + dynamicProfileCondition.getInteger("condition_id"))
                            oldValue = redisValueStr == null ? 0 : Integer.parseInt(redisValueStr)
                        }

                        // 向状态中更新
                        state.put(paramJsonObject.getString("rule_instance_id") + ":" + dynamicProfileCondition.getInteger("condition_id"), oldValue + 1)
                    }
                }
            }
        }
    }
}
